package org.idea.web.socket.mq;

import lombok.extern.slf4j.Slf4j;
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.consumer.ConsumeFromWhere;
import org.apache.rocketmq.common.protocol.heartbeat.MessageModel;
import org.idea.web.socket.config.MqConsumerConfig;
import org.idea.web.socket.config.MqProducerConfig;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import javax.annotation.Resource;

/**
 * @Author linhao
 * @Date created in 10:34 上午 2021/5/10
 */
@Configuration
@Slf4j
@EnableConfigurationProperties({MqConsumerConfig.class})
public class MqConsumerAutoConfig {

    @Resource
    private MqConsumerConfig mqConsumerConfig;

    @Resource
    //这个接口需要手动实现顺序消费的逻辑 每次获取到消息队列的第一条数据
    private MessageListenerHandler messageListenerConcurrently;

    @Bean
    @ConditionalOnMissingBean
    public DefaultMQPushConsumer defaultMQPushConsumer() {
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer();
        consumer.setNamesrvAddr(mqConsumerConfig.getNameSrvAddr());
        consumer.setConsumerGroup(mqConsumerConfig.getGroupName());
        consumer.setConsumeThreadMin(mqConsumerConfig.getConsumeThreadMin());
        consumer.setConsumeThreadMax(mqConsumerConfig.getConsumeThreadMax());
        consumer.registerMessageListener(messageListenerConcurrently);
        consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
        //消费模型是什么？
        consumer.setMessageModel(MessageModel.BROADCASTING);
        //默认一次拉取一条消费
        consumer.setConsumeMessageBatchMaxSize(mqConsumerConfig.getConsumeMessageBatchMaxSize());
        //*表示订阅所有的tag
        try {
            consumer.subscribe(mqConsumerConfig.getTopics(), "*");
            consumer.start();
            log.info("【 MqConsumerAutoConfig 】mq consumer is started!");
        } catch (Exception e) {
            log.error("mq start fail,e is ", e);
        }
        return consumer;
    }
}
